package io.sundial.executor.impl;

import io.sundial.coordination.CallbackException;
import io.sundial.coordination.Coordinator;
import io.sundial.core.Callback;
import io.sundial.core.context.Context;
import io.sundial.core.event.EventListener;
import io.sundial.core.lifecycle.exception.InitializingException;
import io.sundial.executor.event.*;
import io.sundial.job.JobCommand;
import io.sundial.job.JobException;
import io.sundial.job.JobResult;
import io.sundial.job.JobStatus;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
 * 回复调度器的执行器
 *
 * @author Payne 646742615@qq.com
 * 2018/12/26 11:04
 */
public abstract class ReplyingExecutor extends DiscoveringExecutor {

    @Override
    protected void initializing(Context context) throws InitializingException {
        super.initializing(context);

        // region 监听作业事件
        addEventListener(new JobSubmittedEventListener());
        addEventListener(new JobRejectedEventListener());
        addEventListener(new JobCanceledEventListener());
        addEventListener(new JobExecutingEventListener());
        addEventListener(new JobCompletedEventListener());
        addEventListener(new JobFailedEventListener());
        // endregion
    }

    private abstract class JobReplyingEventListener implements Callback<Coordinator.CreateResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.CreateResult result, CallbackException exception) {
            if (success) {
                logger.info(String.format("job state for %s replied success", result != null ? result.getPath() : null));
            } else {
                logger.warn(String.format("job state for %s replied fail", result != null ? result.getPath() : null), exception);
            }
        }
    }

    private class JobSubmittedEventListener extends JobReplyingEventListener implements EventListener<JobSubmittedEvent> {

        @Override
        public void onListened(JobSubmittedEvent event) throws Exception {
            JobCommand jobCommand = event.getJobCommand();
            String jobPath = jobCommand.getJobPath();
            coordinator.create(jobPath + SPRT + JobStatus.SUBMITTED, null, true, false, this);
        }
    }

    private class JobRejectedEventListener extends JobReplyingEventListener implements EventListener<JobRejectedEvent> {

        @Override
        public void onListened(JobRejectedEvent event) throws Exception {
            JobCommand jobCommand = event.getJobCommand();
            String jobPath = jobCommand.getJobPath();
            coordinator.create(jobPath + SPRT + JobStatus.REJECTED, null, true, false, this);
        }
    }

    private class JobCanceledEventListener extends JobReplyingEventListener implements EventListener<JobCanceledEvent> {

        @Override
        public void onListened(JobCanceledEvent event) throws Exception {
            JobCommand jobCommand = event.getJobCommand();
            String jobPath = jobCommand.getJobPath();
            coordinator.create(jobPath + SPRT + JobStatus.CANCELED, null, true, false, this);
        }
    }

    private class JobExecutingEventListener extends JobReplyingEventListener implements EventListener<JobExecutingEvent> {

        @Override
        public void onListened(JobExecutingEvent event) throws Exception {
            JobCommand jobCommand = event.getJobCommand();
            String jobPath = jobCommand.getJobPath();
            coordinator.create(jobPath + SPRT + JobStatus.EXECUTING, null, true, false, this);
        }
    }

    private class JobCompletedEventListener extends JobReplyingEventListener implements EventListener<JobCompletedEvent> {

        @Override
        public void onListened(JobCompletedEvent event) throws Exception {
            JobCommand jobCommand = event.getJobCommand();
            String jobPath = jobCommand.getJobPath();
            byte[] data = null;
            try {
                JobResult jobResult = event.getJobResult();
                data = marshall(jobResult);
            } catch (Exception e) {
                logger.error("error occurred while replying job completed state", e);
                throw e;
            } finally {
                coordinator.create(jobPath + SPRT + JobStatus.COMPLETED, data, true, false, this);
            }
        }
    }

    private class JobFailedEventListener extends JobReplyingEventListener implements EventListener<JobFailedEvent> {

        @Override
        public void onListened(JobFailedEvent event) throws Exception {
            JobCommand jobCommand = event.getJobCommand();
            String jobPath = jobCommand.getJobPath();
            byte[] data = null;
            try (
                    StringWriter sw = new StringWriter();
                    PrintWriter pw = new PrintWriter(sw)
            ) {
                //noinspection ThrowableNotThrown
                JobException jobException = event.getJobException();
                if (jobException != null) {
                    jobException.printStackTrace(pw);
                    pw.flush();
                    data = marshall(sw.toString());
                }
            } catch (Exception e) {
                logger.error("error occurred while replying job failed state", e);
                throw e;
            } finally {
                coordinator.create(jobPath + SPRT + JobStatus.FAILED, data, true, false, this);
            }
        }
    }


}
